-
Notifications
You must be signed in to change notification settings - Fork 897
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cluster Linking mirror topic priorities demo #265
base: master
Are you sure you want to change the base?
Cluster Linking mirror topic priorities demo #265
Conversation
|
// pre-processing the list of mirror topics to get the info that we care about (name, lag, status) | ||
const mirrorTopics = {}; | ||
const mirrorTopicsNamesArray = []; | ||
console.log(topicsArray.length + ' mirror topics found'); | ||
topicsArray.forEach((topic) => { | ||
const name = topic.mirror_topic_name; | ||
const maxLag = topic.mirror_lags.reduce((max, lagObj) => Math.max(max, lagObj.lag), 0); // we only care about the maximum lag among the partitions | ||
mirrorTopics[name] = { | ||
maxLag, | ||
mirrorStatus: topic.mirror_status, | ||
topicData: topic | ||
}; | ||
mirrorTopicsNamesArray.push(name); | ||
}); | ||
|
||
let shouldThrottle = false; | ||
let resumeTopics = []; // these topics are "safe" and we will call the "Resume" command on them to ensure they are not throttled | ||
for (let i = 0; i < priorities.length; i ++) | ||
{ | ||
const priorityTopics = priorities[i]; | ||
console.log('\nPriority Level ' + i +': ' + priorityTopics.join(', ')); | ||
// add this level of priority to the "safe" topics that we will let run | ||
resumeTopics = resumeTopics.concat(priorityTopics); | ||
|
||
priorityTopics.forEach((topicName) => { | ||
if (shouldThrottle) | ||
{ | ||
return; // we're already going to throttle so don't do any more logic | ||
} | ||
|
||
const topic = mirrorTopics[topicName]; | ||
if (topic.maxLag > lagThreshold) | ||
{ | ||
console.log('Found lagging topic: ' + topicName); | ||
shouldThrottle = true; // lag threshold has been reached; start to throttle lower priorities | ||
} | ||
if (topic.mirrorStatus === 'PAUSED') | ||
{ | ||
console.log('Found paused topic: ' + topicName); | ||
shouldThrottle = true; // unpause the priority levels one at a time, in order of priority | ||
} | ||
}); | ||
|
||
if (shouldThrottle) | ||
{ | ||
break; // break the loop at this priority level if we will throttle | ||
} | ||
} | ||
|
||
console.log('\n'); | ||
if (shouldThrottle) | ||
{ | ||
console.log('Important topics are lagging, starting to throttle topics.'); | ||
console.log('These topics have priority and will be allowed to mirror: ' + resumeTopics.join (', ')); | ||
// dispatch "resume" command on this list of topic names to ensure they are not paused | ||
alterMirrorTopics('resume', resumeTopics); | ||
|
||
// find all mirror topics that don't have priority | ||
const topicsToPause = mirrorTopicsNamesArray.map((topicName) => { | ||
if (resumeTopics.indexOf(topicName) >= 0) | ||
{ | ||
return null; | ||
} | ||
return topicName; | ||
}).filter((e) => e !== null); | ||
console.log('\nThese mirror topics are low priority and will be paused: ' + topicsToPause.join(', ')); | ||
alterMirrorTopics('pause', topicsToPause); | ||
} | ||
else | ||
{ | ||
console.log('No critical lag, will resume all topics'); | ||
// un-pause (resume) all topics to make sure nothing is throttled | ||
|
||
// dispatch "resume" command on all mirror topics on this link | ||
alterMirrorTopics('resume', mirrorTopicsNamesArray); | ||
} | ||
|
||
console.log('\n\n'); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like these should all be tabbed. suggest using a JS editor to auito-indent
There are two ways to run the prioritizing script: | ||
|
||
* `node apply-priorities.js` will do one run of the priorities | ||
* `./run-apply-priorities.sh` will run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest something more descriptive: will run until canceled, periodically checking if any mirroring should be throttled
* `node apply-priorities.js` will do one run of the priorities | ||
* `./run-apply-priorities.sh` will run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest a rename of the second script that makes it clear it runs continuously
if (topic.mirrorStatus === 'PAUSED') | ||
{ | ||
console.log('Found paused topic: ' + topicName); | ||
shouldThrottle = true; // unpause the priority levels one at a time, in order of priority |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the name shouldThrottle
is confusing since in this case it's un-throttling. maybe use two booleans shouldThrottle
and foundPausedTopic
, or use a more generic boolean like shouldChangeTopicStatus
since the block below might pause or resume
else | ||
{ | ||
console.log('No critical lag, will resume all topics'); | ||
// un-pause (resume) all topics to make sure nothing is throttled | ||
|
||
// dispatch "resume" command on all mirror topics on this link | ||
alterMirrorTopics('resume', mirrorTopicsNamesArray); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this ever not be a no-op? could it be deleted? (since we fall into the block above if a topic is paused). If it's a no-op, maybe leave the block but change the console log to No critical lag found at priority level i
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
No description provided.